java 并发之AQS

AbstractQueuedSynchronizer

  • AbstractQueuedSynchronizer 类继承自 AbstractOwnableSynchronizer,AbstractOwnableSynchronizer 主要维护了 exclusiveOwnerThread (独占模式的线程)
  • AbstractQueuedSynchronizer 为实现依赖 FIFO 等待队列的阻塞锁和相关的同步器提供一个框架.该类被设计为依赖单个原子int值表示状态的大多数同步器的一个有用的基础.子类必须定义改变该状态的受保护方法,以及这些方法定义了该状态在获取或释放该对象方面的含义.
  • 使用这个类作为同步器的基础,实现如下方法,使用 getState、setState 或compareAndSetState 方式适当的获取或修改同步状态.
    • tryAcquire
    • tryRelease
    • tryAcquireShared
    • tryReleaseShared
    • isHeldExclusively
  • AbstractQueuedSynchronizer.Node 等待队列节点类

    • 等待队列是 CLH 锁队列的一个变种
    • Node 类的属性如下:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      class Node {
      // 表示节点在共享模式下等待
      static final Node SHARED = new Node();
      // 表示节点在独占模式下等待
      static final Node EXCLUSIVE = null;
      // 表示线程已取消的 waitStatus 值
      static final int CANCELLED = 1;
      // 表示后继线程需要 unparking 的 waitStatus 值
      static final int SIGNAL = -1;
      // 表示线程在条件等待的 waitStatus 值
      static final int CONDITION = -2;
      // 指示下一个 acquireShared的waitStatus 值应该无条件传播
      static final int PROPAGATE = -3;
      /**
      * 状态指定,取如下值:
      * SIGNAL: 当前节点的后继节点被阻塞(通过park),所以当
      * 前节点释放或取消的时候需要unpark后继节点
      * CANCELLED: 当前节点由于超时或中断被取消
      * CONDITION: 当前节点在条件队列中
      * PROPAGATE: releaseShared 操作需要传播给其他节点
      * 0: 以上都不是
      * 值以数字的方式排列是为了简化使用.
      * 对于普通的同步节点,该字段的初始值为0,condition 节点初始值
      * 为 CONDITION
      */
      volatile int waitStatus;

      volatile Node prev;

      volatile Node next;

      volatile Thread thread;
      /**
      * 指向下一个条件等待的节点或特定值 SHARED. 因为条件队列仅出在
      * 独占模式,当节点处于条件等待时,我们仅需要一个简单的链表保存
      * 节点. 随后它们会被转移到 re-acquire 队列中. 因为条件等待必
      * 须是独占的,所以使用这个字段表示共享模式(见 isShared() 的判
      * 断逻辑).
      */
      Node nextWaiter;
      }
  • AbstractQueuedSynchronizer的主要属性如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    /**
    * 等待队列的头结点,延迟初始化. 除了初始化,其更新操作都通过setHead方法.
    * 如果头结点存在,其状态保证不是 CANCELLED
    */
    private transient volatile Node head;
    /**
    * 等待队列的尾节点. 只有在添加新节点时通过 enq 进行更新
    */
    private transient volatile Node tail;
    // 同步状态
    private volatile int state;
  • CLH队列的入队操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    private Node enq(final Node node) {
    for (;;) {
    Node t = tail;
    if (t == null) { // 尾节点为空,则表示队列为空,先初始化头结点
    if (compareAndSetHead(new Node()))
    tail = head;
    } else {
    node.prev = t;
    if (compareAndSetTail(t, node)) {
    t.next = node;
    return t;
    }
    }
    }
    }
  • 为当前线程创建节点并插入等待队列

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 尝试最快的方式入队,失败以后在执行完整的入队操作
    Node pred = tail;
    if (pred != null) {
    node.prev = pred;
    if (compareAndSetTail(pred, node)) {
    pred.next = node;
    return node;
    }
    }
    enq(node);
    return node;
    }
  • private void unparkSuccessor(Node node) 唤醒后继节点,找到当前节点后继节点中第一个状态不为取消状态的节点进行唤醒

  • private void doReleaseShared() 共享模式的释放操作
  • private void cancelAcquire(Node node) 取消正在进行的获取尝试
  • private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) 检查和更新未能获取的节点的状态.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
    /*
    * 当前节点已经设置状态,请求当前驱节点释放的时候通知它,因此可以安全的
    * park当前节点
    */
    return true;
    if (ws > 0) {
    /*
    * 前驱节点被取消了,跳过所有被取消的前驱节点
    */
    do {
    node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    pred.next = node;
    } else {
    /*
    * waitStatus 必须为0或 PROPAGATE,表示我们需要通知,但是不进行
    * park,调用者需要进行重试,确保在park之前不会进行获取
    */
    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
    }
  • public final void acquire(int arg) 独占式获取操作,忽略中断,首先尝试获取,获取失败后将当前节点插入到等待队列,然后在循环中自旋获取,在获取过程中如果前驱节点的状态为 SIGNAL,则park当前线程,等待被唤醒,唤醒以后重新自旋获取

    1
    2
    3
    4
    5
    public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
    }
  • public final void acquireInterruptibly(int arg) 独占式获取,响应中断,如果在获取的过程中线程中断直接抛出InterruptedException

  • public final boolean tryAcquireNanos(int arg, long nanosTimeout) 超时获取,响应中断,超时失败返回
  • public final boolean release(int arg) 独占模式的释放操作
  • public final void acquireShared(int arg) 共享模式获取,不响应中断
  • public final void acquireSharedInterruptibly(int arg) 共享模式获取,响应中断
  • public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) 共享模式获取,响应中断,响应超时
  • public final boolean releaseShared(int arg) 共享模式释放操作
  • ConditionObject 实现的实现 ReentrantLock 的 condition 就是基于此实现的,condition 的 wait、signal 操作必须在持有锁的情况下进行

    • condition 队列,condition 队列由以下两个应用标识头尾,并由 Node 的 nextWaiter 进行连接,condition 队列中的状态都是 Node.CONDITION

      1
      2
      3
      4
      // condition 队列的第一个节点
      private transient Node firstWaiter;
      // condition 队列的最后一个节点
      private transient Node lastWaiter;
    • public final void await() wait 操作,主要由以下步骤组成

      • 将当前线程添加到 condition 对于列
      • 释放当前线程持有的锁
      • 循环判断当前节点是否在同步等待队列中,如果不在同步等待队列中则执行 LockSupport.park 操作挂起当前线程
      • 当前线程被唤醒且关联的节点在同步等待队列中则执行后续操作
      • 重新去获取锁,并继续往后执行
    • public final void signal() signal 操作
      • 执行 signal 的线程必须独占的持有锁
      • 将 condition 队列中的第一个节点移动到同步等待队列的尾部